package com.code.ape.codeape.inhos.mq.listener;

import com.alibaba.fastjson.JSON;
import com.code.ape.codeape.common.core.util.AssertUtil;
import com.code.ape.codeape.common.elasticsearch.ResetElasticSearchClient;
import com.code.ape.codeape.inhos.api.entity.constant.MqConstant;
import com.code.ape.codeape.inhos.api.entity.constant.PatInfoConstant;
import com.code.ape.codeape.inhos.api.vo.PatInfoVO;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.rest.RestStatus;
import org.springframework.stereotype.Component;

/**
 * @author 公众号：码猿技术专栏
 * @url: www.java-family.cn
 * @description 处理患者办理出院的消息，触发冷热分离
 * 踩坑：1. 保证消费幂等
 * 		2. 消息消费失败超过最大重试次数则会进入死信队列，因此需要监听死信队列再次处理
 */
@RocketMQMessageListener(topic = MqConstant.PAT_TOPIC,
		selectorExpression=MqConstant.TAG_COLD,
		consumerGroup = "pat-cold-consumer",
		//并发消费的线程，默认64，这里调整为5
		consumeThreadMax = 5,
		//消费失败的重试次数，这里调整为5次，达到5次后进入死信队列，人工干预
		maxReconsumeTimes=5)
@Slf4j
@Component
@RequiredArgsConstructor
public class PatColdMqListener implements RocketMQListener<String> {

	private final ResetElasticSearchClient resetElasticSearchClient;


	@SneakyThrows
	@Override
	public void onMessage(String message) {
		if (log.isDebugEnabled())
			log.debug("消费端接收到一条出院冷热分离的消息：{}",message);
		PatInfoVO patInfoVO = JSON.parseObject(message, PatInfoVO.class);
		// step1 向ElasticSearch中添加数据，此操作具备幂等性，不存在则插入，存在则更新
		IndexResponse indexResponse = resetElasticSearchClient.add(PatInfoConstant.PAT_INFO_INDEX_NAME, patInfoVO.getId().toString(), message);
		//删除成功||404（已经删除了）
		AssertUtil.isTrue(indexResponse.status().equals(RestStatus.OK)||indexResponse.status().equals(RestStatus.NOT_FOUND),"添加冷数据到ES中失败！");
	}
}
